import ma.const
import os
import ma.fs.dfs._hdfs
import sys
import time
from ma.commons.core.maptipinfo import MapTIPInfo
from ma.commons.core.reducetipinfo import ReduceTIPInfo
from ma.commons.core.maptaskrunner import MapTaskRunner
from ma.commons.core.reducetaskrunner import ReduceTaskRunner

job_id = 2
no_input_files = 4

input_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_input_temp_dir, job_id) + os.sep
output_dest_dir = ma.const.JobsXmlData.get_filepath_str_data(ma.const.xml_local_output_temp_dir, job_id) + os.sep

user_map_module = "ma.codes.mapall"
user_map_class = "Map"
from ma.codes.reduce import Reduce

threads = []

key_value_dict = {}
answer_value_dict = {}
maptip_infos = {}

host = ma.const.XmlData.get_str_data(ma.const.xml_hdfs_host)
port = ma.const.XmlData.get_int_data(ma.const.xml_hdfs_port)
hdfs  = ma.fs.dfs._hdfs.HDFS(host, port)

print('////////////// MAP PART //////////////////')

start_time = time.time()

for file_no in range(no_input_files):
    filename = ma.const.JobsXmlData.get_str_data(ma.const.xml_map_input_filename, job_id, file_no)
    dfs_input_filepath = ma.const.JobsXmlData.get_dfs_filepath_str_data(ma.const.xml_dfs_path_job_input, job_id) + ma.const.dfs_dir_sep + filename
    
    input_dest_filepath = input_dest_dir + os.path.basename(dfs_input_filepath)
    
    if os.path.isfile(input_dest_filepath):
        print('Deleting old local file: %s' % input_dest_filepath)
        os.remove(input_dest_filepath)
    
    ret = hdfs.copy_file_to_local(dfs_input_filepath, input_dest_dir, job_id)
    
    if not ret:
        print('Couldn\'t copy file', dfs_input_filepath)
        sys.exit()
    
    maptip_info = MapTIPInfo(file_no, job_id, [(filename, 0, 0)])
    task_runner = MapTaskRunner(threads, maptip_info, user_map_module, user_map_class, key_value_dict)
    
    print('Mapping input', file_no)
    task_runner.startTask(maptip_info, threads)
    print('Mapped input', file_no)

print('////////////// REDUCE PART //////////////////')

print('Keys present for reduce:', list(key_value_dict.keys()))

for key in key_value_dict:
    print('Reducing key:', key)
    list_of_values = key_value_dict[key]
    print(list_of_values) 
    reducer = Reduce(key, list_of_values, answer_value_dict)
    reducer.start()
    reducer.join()
    print('Reduced key:', key)

print('////////////// ANSWER PART //////////////////')
    
print('Final Answers:')
for key in answer_value_dict:
    print('MR ouput for key', key, ':', answer_value_dict[key])

end_time = time.time()

print('Total time in seconds:', end_time - start_time)
